Created May 28, 2019 12:58
Cassandra CDC idempotent consumer
package com.waitingforcode.cassandra_cdc;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CdcCommitLogReadHandler implements CommitLogReadHandler {
public boolean shouldSkipSegmentOnError(CommitLogReadException e) throws IOException {
System.out.println("Should skip segment error ?");
return false;
public void handleUnrecoverableError(CommitLogReadException e) throws IOException {
public void handleMutation(Mutation mutation, int size, int entryLocation, CommitLogDescriptor commitLogDescriptor) {
for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) {
if (partitionUpdate.metadata().ksName.equals("cdc_test")) {
String rowKey = partitionUpdate.metadata().getKeyValidator().getString(partitionUpdate.partitionKey().getKey());
List<String> values = new ArrayList<>();
partitionUpdate.unfilteredIterator().forEachRemaining(partitionRow -> {
Row row = partitionUpdate.getRow((Clustering) partitionRow.clustering());
Iterator<Cell> cells = row.cells().iterator();
Iterator<ColumnDefinition> columns = row.columns().iterator();
while (cells.hasNext() && columns.hasNext()) {
ColumnDefinition definition =;
Cell cell =;
System.out.println("Got new row="+values);

Cluster setup:

ccm remove cdc_cluster
ccm create cdc_cluster -v 3.11.3
ccm populate -n 1
ccm node1 start

Init data

ccm node1 cqlsh
 CREATE KEYSPACE cdc_test  WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1};
 USE cdc_test;
 CREATE TABLE orders (id int, amount double, first_order boolean, PRIMARY KEY(id)) WITH cdc=true;
 INSERT INTO  orders (id, amount, first_order) VALUES (1, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (2, 100, false) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (3, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (4, 100, false) IF NOT EXISTS;  
 INSERT INTO  orders (id, amount, first_order) VALUES (5, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (6, 100, false) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (7, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (8, 100, false) IF NOT EXISTS;  
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=""
// Inspired from
public class Reader {
private final WatchService watcher;
private final Path dir;
private final WatchKey key;
private final CommitLogReader commitLogReader;
private final CdcCommitLogReadHandler commitLogReadHandler;
public Reader() throws IOException {
this.dir = Paths.get("/home/bartosz/.ccm/cdc_cluster/node1/cdc_raw");
watcher = FileSystems.getDefault().newWatchService();
key = dir.register(watcher, ENTRY_CREATE, ENTRY_MODIFY);
commitLogReader = new CommitLogReader();
commitLogReadHandler = new CdcCommitLogReadHandler();
System.setProperty("cassandra.config", "file:///home/bartosz/.ccm/cdc_cluster/node1/conf/cassandra.yaml");
public void processEvents() throws InterruptedException, IOException {
System.out.println("Processing events");
while (true) {
WatchKey aKey = watcher.take();
if (!key.equals(aKey)) {
System.out.println("WatchKey not recognized.");
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind != ENTRY_CREATE) {
// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = (WatchEvent<Path>) event;
Path relativePath = ev.context();
Path absolutePath = dir.resolve(relativePath);
// print out event
System.out.println(event.kind().name() + " for " + absolutePath);
public static void main(String[] args) throws IOException, InterruptedException {
new Reader().processEvents();
private void processCommitLogSegment(Path path) throws IOException {
System.out.println("Processing commitlog segment..."+path.getFileName().toAbsolutePath());
commitLogReader.readCommitLogSegment(commitLogReadHandler, path.toFile(), false);
System.out.println("Commitlog segment processed.");
